-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28366][CORE] Logging in driver when loading single large unsplittable file #25134
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment.
Test build #107594 has finished for PR 25134 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem is that it's pretty commonly known that unsplittable codec isn't able to be proceeded in parallel.
Spark itself writes multiple files from each partition so Spark users won't meet this an issue often arguably. So, this logging mostly applies the case when we read a big file from external source.
If we should add the logging here, we should add warning here and there. For instance, multiLine
option for CSV and JSON too.
@HyukjinKwon Yea, but some users complain that loading large unsplittable file without logging make them confusing... |
Test build #107749 has finished for PR 25134 at commit
|
Test build #107751 has finished for PR 25134 at commit
|
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
@HyukjinKwon Now I handled all the cases which file is unsplittable. |
Test build #107768 has finished for PR 25134 at commit
|
Test build #107812 has finished for PR 25134 at commit
|
if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) { | ||
val fileSplit = inputSplits(0).asInstanceOf[FileSplit] | ||
val path = fileSplit.getPath | ||
if (Utils.isFileSplittable(path, codecFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need to know if it's splittable or not? If Spark is scanning files with a single giant partition, it's going to be very slow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Yes. But we'd better tell user why it only generate only one partition. So I prefer:
- If the file is unsplittable, then in log tell user the file is unsplittable (and include unsplittable reason)
- If the file is splittable, then in log tell user we can increase parallelism by setting the argument
minPartitions
in methodsc.textFile
.
What do you think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Any thoughts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM, let's include the reason in the message.
Test build #107840 has finished for PR 25134 at commit
|
Retest this please. |
Test build #108025 has finished for PR 25134 at commit
|
Test build #108029 has finished for PR 25134 at commit
|
Test build #108030 has finished for PR 25134 at commit
|
Test build #108361 has finished for PR 25134 at commit
|
Jenkins, retest this please. |
Test build #108370 has finished for PR 25134 at commit
|
retest this please |
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
Outdated
Show resolved
Hide resolved
Test build #108374 has finished for PR 25134 at commit
|
Test build #108393 has finished for PR 25134 at commit
|
if (!isSplitable(path)) { | ||
Some("the file is compressed by unsplittable compression codec") | ||
} else { | ||
None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will we hit this branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove all branch return None
and add assert.
Test build #108476 has finished for PR 25134 at commit
|
thanks, merging to master! |
* If a file with `path` is unsplittable, return the unsplittable reason, | ||
* otherwise return `None`. | ||
*/ | ||
def getFileUnSplittableReason(path: Path): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, is it really worth to expose another internal API in our common source trait?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have isSplittable
and it makes sense to explain why it's unsplittable. Maybe there is a way to merge these 2 methods, but I can't think of one now.
.doc("When spark loading one single large file, if file size exceed this " + | ||
"threshold, then log warning with possible reasons.") | ||
.longConf | ||
.createWithDefault(1024 * 1024 * 1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's worth adding a config. It looks an overkill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- this is an internal config.
- "large file" is vague, and I don't think we can hardcode a value and say that's "large file".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is, this warning stuff is trivial and not important actually.
We can just pick any reasonable number. Who will configure this? I won't do that. This information shouldn't be job-based, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
people may set it to Long.Max to disable the warning. Besides, an internal config doesn't hurt. I think we have many internal configs that users will never set.
if (Utils.isFileSplittable(path, codecFactory)) { | ||
logWarning(s"Loading one large file ${path.toString} with only one partition, " + | ||
s"we can increase partition numbers by the `minPartitions` argument in method " + | ||
"`sc.textFile`") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it always sc.textFile
? Many datasource V1 implementation still uses hadoopFile
or newHadoopFile
often.
if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) { | ||
val codecFactory = new CompressionCodecFactory(jobConf) | ||
if (Utils.isFileSplittable(path, codecFactory)) { | ||
logWarning(s"Loading one large file ${path.toString} with only one partition, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit toString
won't be needed since here's string interpolation.
val codecFactory = new CompressionCodecFactory(jobConf) | ||
if (Utils.isFileSplittable(path, codecFactory)) { | ||
logWarning(s"Loading one large file ${path.toString} with only one partition, " + | ||
s"we can increase partition numbers by the `minPartitions` argument in method " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: and s
isn't needed too
@cloud-fan, this looks overkill. Can we simply mention it in DataFrame(Reader|Writer)/DataStream(Reader|Writer) for our datasources? For Hadoop ones, hadoop input format or somewhere else should describe it. |
even if we document it, how can they know what's the codec of the data files? It's better to give a warning before running a foreseeable long job. |
They will specify it in |
private[spark] val IO_WARNING_LARGEFILETHRESHOLD = | ||
ConfigBuilder("spark.io.warning.largeFileThreshold") | ||
.internal() | ||
.doc("When spark loading one single large file, if file size exceed this " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the description to
If the size in bytes of a file loaded by Spark exceeds this threshold, a warning is logged with the possible reasons.
.internal() | ||
.doc("When spark loading one single large file, if file size exceed this " + | ||
"threshold, then log warning with possible reasons.") | ||
.longConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update it to .bytesConf(ByteUnit.BYTE)
…HRESHOLD ### What changes were proposed in this pull request? Improve conf `IO_WARNING_LARGEFILETHRESHOLD` (a.k.a `spark.io.warning.largeFileThreshold`): * reword documentation * change type from `long` to `bytes` ### Why are the changes needed? Improvements according to #25134 (comment) & #25134 (comment). ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Pass Jenkins. Closes #26691 from Ngone51/SPARK-28366-followup. Authored-by: wuyi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…HRESHOLD ### What changes were proposed in this pull request? Improve conf `IO_WARNING_LARGEFILETHRESHOLD` (a.k.a `spark.io.warning.largeFileThreshold`): * reword documentation * change type from `long` to `bytes` ### Why are the changes needed? Improvements according to apache#25134 (comment) & apache#25134 (comment). ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Pass Jenkins. Closes apache#26691 from Ngone51/SPARK-28366-followup. Authored-by: wuyi <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Logging in driver when loading single large unsplittable file via
sc.textFile
or csv/json datasouce.Current condition triggering logging is
spark.io.warning.largeFileThreshold
(default value is 1GB)How was this patch tested?
Manually test.
Generate one gzip file exceeding 1GB,
then launch spark-shell,
run
Will print log like:
run
Will print log like:
run
Will print log like:
run
Will print log like:
JSON and Text datasource also tested with similar cases.
Please review https://spark.apache.org/contributing.html before opening a pull request.